package com.coupon.distribution.service.impl;

import com.alibaba.fastjson.JSON;
import com.coupon.common.constant.Constants;
import com.coupon.common.exception.CouponException;
import com.coupon.common.vo.CouponTemplateSDK;
import com.coupon.common.vo.GoodsInfo;
import com.coupon.common.vo.SettlementInfo;
import com.coupon.distribution.constant.CouponStatusEnum;
import com.coupon.distribution.dao.CouponDao;
import com.coupon.distribution.entity.Coupon;
import com.coupon.distribution.feign.CouponTemplateClient;
import com.coupon.distribution.feign.SettlementClient;
import com.coupon.distribution.service.RedisService;
import com.coupon.distribution.service.UserService;
import com.coupon.distribution.vo.AcquireTemplateRequest;
import com.coupon.distribution.vo.CouponClassify;
import com.coupon.distribution.vo.CouponKafkaMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author 王哲
 * @Contact 1121586359@qq.com
 * @ClassName UserServiceImpl.java
 * @create 2023年06月26日 下午8:17
 * @Description 用户服务相关的接口实现
 * 所有的操作过程，状态都保存在Redis中，并通过kafka把消息传递到MySQL中
 * @Version V1.0
 */
@Slf4j
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private CouponDao couponDao;

    @Autowired
    private RedisService redisService;

    @Resource
    private CouponTemplateClient couponTemplateClient;

    @Resource
    private SettlementClient settlementClient;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 用户三类状态优惠券信息展示服务
    @Override
    public List<Coupon> findCouponByStatus(Long userId, Integer status) throws CouponException {
        // 从Redis中获取优惠券信息
        List<Coupon> cachedCoupons = redisService.getCachedCoupons(userId, status);
        List<Coupon> preTarget;

        if (CollectionUtils.isNotEmpty(cachedCoupons)) {
            log.debug("coupon cache is not empty: {},{}", userId, status);
            preTarget = cachedCoupons;
        } else {
            log.debug("coupon cache is empty, get coupon from db: {},{}", userId, status);
            List<Coupon> couponList =
                    couponDao.findAllByUserIdAndStatus(userId, CouponStatusEnum.of(status));
            // 如果数据库中没有记录，直接返回就可以，Cache中已经加入了一张无效的优惠券
            if (CollectionUtils.isEmpty(couponList)) {
                log.debug("current user do not have coupon: {},{}", userId, status);
                return couponList;
            }
            // 填充db中的couponTemplateSDK字段
            Map<Integer, CouponTemplateSDK> ids2TemplateSDK = couponTemplateClient.findIds2TemplateSDK(couponList.stream()
                    .map(Coupon::getTemplateId)
                    .collect(Collectors.toList())).getData();

            couponList.forEach(coupon -> {
                coupon.setCouponTemplateSDK(ids2TemplateSDK.get(coupon.getTemplateId()));
            });

            // 将记录写入Redis
            redisService.addCouponToCache(userId, couponList, status);

            preTarget = couponList;
        }

        // 将无效优惠券剔除
        preTarget = preTarget.stream()
                .filter(c -> c.getId() != -1)
                .collect(Collectors.toList());

        // 如果当前获取的是可用优惠券，还需要做对已过期优惠券的延迟处理
        if (CouponStatusEnum.of(status) == CouponStatusEnum.USABLE) {
            CouponClassify couponClassify = CouponClassify.classify(preTarget);
            // 如果已过期状态不为空，需要做延迟处理
            if (CollectionUtils.isNotEmpty(couponClassify.getExpired())) {
                log.info("Add Expired Coupons To Cache From FindCouponByStatus: {},{}", userId, status);
                redisService.addCouponToCache(userId,
                        couponClassify.getExpired(),
                        CouponStatusEnum.EXPIRED.getCode()
                );

                // 发送到kafka中做异步处理
                kafkaTemplate.send(Constants.TOPIC,
                        JSON.toJSONString(new CouponKafkaMessageVO(
                                CouponStatusEnum.EXPIRED.getCode(),
                                couponClassify.getExpired().stream()
                                        .map(Coupon::getId)
                                        .collect(Collectors.toList())
                        )));
            }

            return couponClassify.getUsable();
        }

        return preTarget;
    }

    // 查看用户当前可以领取的优惠券模板
    @Override
    public List<CouponTemplateSDK> findAvailableTemplate(Long userId) throws CouponException {

        Long curDate = new Date().getTime();

        List<CouponTemplateSDK> couponTemplateSDKList = couponTemplateClient.findAllUsableTemplate().getData();

        log.debug("Find All Template(From TemplateClient) Count: {}", couponTemplateSDKList.size());

        // 过滤过期的优惠券模板
        couponTemplateSDKList = couponTemplateSDKList.stream()
                .filter(t -> t.getRule().getExpiration().getDeadline() > curDate)
                .collect(Collectors.toList());

        log.info("Find Usable Template Count: {}", couponTemplateSDKList.size());
        // key是TemplateId value中的key是Template limitation，value是优惠券模板
        HashMap<Integer, Pair<Integer, CouponTemplateSDK>> integerPairHashMap = new HashMap<>(couponTemplateSDKList.size());

        couponTemplateSDKList.forEach(t -> integerPairHashMap.put(t.getId(),
                Pair.of(t.getRule().getLimitation(), t)));


        List<CouponTemplateSDK> result = new ArrayList<>(integerPairHashMap.size());
        List<Coupon> userUsableCoupons = findCouponByStatus(userId, CouponStatusEnum.USABLE.getCode());
        log.debug("Current User Has Usable Coupons: {},{}", userId, userUsableCoupons.size());

        // key是TemplateId
        Map<Integer, List<Coupon>> templateId2Coupons = userUsableCoupons.stream()
                .collect(Collectors.groupingBy(Coupon::getTemplateId));

        // 根据Template的Rule判断是否可以领取优惠券模板
        integerPairHashMap.forEach((k, v) -> {
            // 获取优惠券模板的限制
            Integer limitation = v.getLeft();
            // 获取优惠券模板
            CouponTemplateSDK couponTemplateSDK = v.getRight();

            if (templateId2Coupons.containsKey(k) && templateId2Coupons.get(k).size() >= limitation) {
                return;
            }
            result.add(couponTemplateSDK);
        });

        return result;
    }

    // 用户领取优惠券服务

    /**
     * 1.从TemplateClient拿到对应的优惠券模板,并检查是否过期
     * 2.根据limitation判断用户是否可以领取
     * 3.save to db
     * 4.填充CouponTemplateSDK
     * 5.save to cache
     *
     * @param request
     * @return
     * @throws CouponException
     */
    @Override
    public Coupon acquireTemplate(AcquireTemplateRequest request) throws CouponException {
        // 获取优惠券模板
        Map<Integer, CouponTemplateSDK> id2Template = couponTemplateClient.findIds2TemplateSDK(
                Collections.singletonList(request.getTemplateSDK().getId())
        ).getData();

        // 优惠券模板是否存在
        if (id2Template.size() <= 0) {
            log.error("Can Not Acquire Template From TemplateClient: {}", request.getTemplateSDK().getId());
            throw new CouponException("Can Not Acquire Template From TemplateClient");
        }

        // 判断用户是否可以领取优惠券
        List<Coupon> couponByStatus = findCouponByStatus(request.getUserId(), CouponStatusEnum.USABLE.getCode());

        Map<Integer, List<Coupon>> templateId2Coupons = couponByStatus.stream()
                .collect(Collectors.groupingBy(Coupon::getTemplateId));

        if (templateId2Coupons.containsKey(request.getTemplateSDK().getId())
                && templateId2Coupons.get(request.getTemplateSDK().getId()).size() >=
                request.getTemplateSDK().getRule().getLimitation()) {
            log.error("Exceed Template Assign Limitation: {}", request.getTemplateSDK().getId());
            throw new CouponException("Exceed Template Assign Limitation");
        }

        // 尝试去获取优惠券码
        String code = redisService.tryToAcquireCouponCodeFromCache(request.getTemplateSDK().getId());
        if (StringUtils.isEmpty(code)) {
            log.error("Can Not Acquire Coupon Code: {}", request.getTemplateSDK().getId());
            throw new CouponException("Can Not Acquire Coupon Code");
        }

        Coupon coupon = new Coupon(
                request.getTemplateSDK().getId(),
                request.getUserId(),
                code,
                CouponStatusEnum.USABLE
        );

        // 保存到数据库
        coupon = couponDao.save(coupon);

        // 填充Coupon对象的CouponTemplateSDK，一定要在放入缓存之前填充
        coupon.setCouponTemplateSDK(request.getTemplateSDK());

        // 放入缓存中
        redisService.addCouponToCache(request.getUserId(),
                Collections.singletonList(coupon),
                CouponStatusEnum.USABLE.getCode());


        return coupon;
    }

    // 用户消费优惠券服务
    @Override
    public SettlementInfo settlement(SettlementInfo info) throws CouponException {
        // 当没有传递优惠券时，直接返回商品总价
        List<SettlementInfo.CouponAndTemplateInfo> couponAndTemplateInfos = info.getCouponAndTemplateInfos();
        if (CollectionUtils.isEmpty(couponAndTemplateInfos)) {
            log.info("Empty Coupons For Settle.");
            double goodsSum = 0.0;
            for (GoodsInfo goodsInfo : info.getGoodsInfos()) {
                goodsSum += goodsInfo.getPrice() * goodsInfo.getCount();
            }
            // 没有优惠券也就没有优惠券的核销，所以直接返回商品总价
            info.setCost(retain2Decimals(goodsSum));
        }

        // 校验传递的优惠券是否是用户自己的
        List<Coupon> coupons = findCouponByStatus(info.getUserId(), CouponStatusEnum.USABLE.getCode());
        List<Integer> collect = coupons.stream().map(s -> s.getId()).collect(Collectors.toList());

        couponAndTemplateInfos.stream().map(s -> s.getId()).forEach(s -> {
            if (!collect.contains(s)) {
                log.error("Coupon is not User's: {}", s);
                try {
                    throw new CouponException("Coupon is not User's");
                } catch (CouponException e) {
                    log.error("Error:{}", e.getMessage());
                }
            }
        });

        // 获取结算信息
        info = settlementClient.computeRule(info).getData();


        return info;
    }

    /**
     * 保留两位小数
     *
     * @param goodsSum
     * @return
     */
    private Double retain2Decimals(double goodsSum) {
        return new BigDecimal(goodsSum).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
    }
}
